
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;


public class State_BroadcastState {
    public static void main(String[] args) {
        //控制流发送到普通流后，普通流会收到一个广播状态
        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("192.168.1.27", 8888);
        DataStreamSource<String> controlDS = env.socketTextStream("192.168.1.27", 9999);
        //TODO 1.把其中一条流(控制流) 广播出去
        //定义一个Map状态描述器,控制流会把这个状态广播出去
        MapStateDescriptor<String, String> broadcast = new MapStateDescriptor<>("boradcast-state", Types.STRING, Types.STRING);
        BroadcastStream<String> contrlBS = controlDS.broadcast(broadcast);

        //TODO 2.把另一条流和广播流关联起来
        BroadcastConnectedStream<String, String> inputBCS = inputDS.connect(contrlBS);

        //TODO 3.调用Process

        inputBCS.process(
                new BroadcastProcessFunction<String, String, String>() {
                    /*
                        获取广播状态，获取数据进行处理
                     */
                    @Override
                    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        //TODO 5.通过上下文获取广播状态，取出里面的值
                        ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                        String aSwitch = broadcastState.get("switch");
                        if ("1".equals(aSwitch)) {
                            out.collect("切换到1的逻辑");
                        } else if ("2".equals(aSwitch)) {
                            out.collect("切换到2的逻辑");
                        }


                    }

                    /**
                     * 处理广播流的数据：这里主要定义，什么数据往广播状态存
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        // TODO 4.通过上下文获取广播状态,并往广播状态里存数据
                        BroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                        broadcastState.put("switch", value);
                    }
                }
        ).print();
        //提交任务
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}



